#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <dirent.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "../replica/replica.h"
#include "../chunk/chunk_proto.h"
#include "net_table.h"
#include "conn.h"
#include "configure.h"
#include "net_global.h"
#include "../storage/stor_rpc.h"
#include "../controller/stor_ctl.h"
#include "md_map.h"
#include "md_root.h"
#include "md_parent.h"
#include "lich_md.h"
#include "ylog.h"
#include "dbg.h"

STATIC int __md_getpool(const nid_t *nid, const fileid_t *chkid, char *pool)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_getpool(chkid, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_getpool(nid, chkid, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_getpool(const fileid_t *chkid, char *pool)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_getpool(&nid, chkid, pool);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_getattr(const nid_t *nid, const fileid_t *chkid, fileinfo_t *fileinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_getattr(chkid, fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_getattr(nid, chkid, fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_getattr(const fileid_t *chkid, fileinfo_t *fileinfo)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_getattr(&nid, chkid, fileinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_stat(const nid_t *nid, const fileid_t *chkid, filestat_t *filestat)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_stat(chkid, filestat, 0, STOR_STAT_WHOLLY);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_stat(nid, chkid, filestat, 0, STOR_STAT_WHOLLY);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_stat(const fileid_t *chkid, filestat_t *filestat)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_stat(&nid, chkid, filestat);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        //md_map_update(&chkinfo->id, &chkinfo->diskid[0]);

        return 0;
err_ret:
        return ret;
}

STATIC int __md_check_ready(const nid_t *nid, const fileid_t *chkid)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_check_ready(chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_check_ready(nid, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_check_ready(const fileid_t *chkid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_check_ready(&nid, chkid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_setattr(const nid_t *nid, const fileid_t *chkid, fileinfo_t *fileinfo, const setattr_t *setattr)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_setattr(chkid, fileinfo, setattr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_setattr(nid, chkid, fileinfo, setattr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_setattr(const fileid_t *chkid, fileinfo_t *_fileinfo, const setattr_t *setattr)
{
        int ret, retry = 0, retry1 = 0;
        nid_t nid;
        fileinfo_t fileinfo;

#if !ENABLE_REPLICA_FALLBACK
        int total, online;

        if (!cluster_is_solomode()) {
                ret = conn_faultdomain(&total, &online);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (setattr->replica.set_it) {
                        if (setattr->replica.val + 1 > total) {
                                DWARN("replica %u, total %u\n", setattr->replica.val, total);
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                }
        }
#endif

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry1:
        ret = __md_setattr(&nid, chkid, &fileinfo, setattr);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, &nid);
                        retry = 1;
                        goto retry;
                } else if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry1, retry1, 10, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        if (_fileinfo)
                *_fileinfo = fileinfo;
        //md_map_update(&chkinfo->id, &chkinfo->diskid[0]);

        return 0;
err_ret:
        return ret;
}

STATIC int __md_mkpool(const nid_t *nid, const fileid_t *parent,
                      const char *name, const char *site_name,
                      const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_mkpool(parent, name, site_name, setattr, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_mkpool(nid, parent, name, site_name, setattr, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_check_replica(const fileid_t *parent, setattr_t *setattr, uint32_t type)
{
        int ret;
        fileinfo_t fileinfo;
        //setattr_t _setattr;

        if (setattr->replica.set_it)
                goto out;

//retry:
        ret = md_getattr(parent, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(fileinfo.repnum_usr);
#if 0
        if (fileinfo.repnum == 0) {
                DWARN(""CHKID_FORMAT" lost fileinfo\n", CHKID_ARG(parent));

                md_initattr(&_setattr, __S_IFDIR, gloconf.chunk_rep);
                ret = md_setattr(parent, NULL, &_setattr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                goto retry;
        }
#endif

        setattr->replica.set_it = 1;
        if (S_ISREG(type) && fileinfo.ec.plugin != PLUGIN_NULL) {
                setattr->replica.val = fileinfo.ec.m;
        } else {
                setattr->replica.val = fileinfo.repnum_usr;
        }

out:
        return 0;
err_ret:
        return ret;
}

int md_mkpool_with_area(const fileid_t *parent, const char *name,
                        const char *site_name, const setattr_t *_setattr, chkinfo_t *chkinfo)
{
        int ret, retry = 0;
        nid_t nid;
        setattr_t setattr;

        memcpy(&setattr, _setattr, sizeof(*_setattr));

        ret = __md_check_replica(parent, &setattr, __S_IFDIR);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_mkpool(&nid, parent, name, site_name, &setattr, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);

#if 0
        ret = md_setattr(&chkinfo->id, NULL, &setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

int md_mkpool(const fileid_t *parent, const char *name, const setattr_t *_setattr, chkinfo_t *chkinfo)
{
        int ret;

        ret = md_mkpool_with_area(parent, name, "", _setattr, chkinfo);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int md_extend_pool(const fileid_t *poolid)
{
        int ret;
        nid_t nid;

        ret = md_map_getsrv(poolid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocalcall(&nid)) {
                ret = stor_ctl_extend_pool(poolid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_extend_pool(&nid, poolid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_create(const nid_t *nid, const fileid_t *parent, const char *name,
                const char *site_name, const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_mkvol(parent, name, site_name, setattr, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_mkvol(nid, parent, name, site_name, setattr, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_mkvol(const fileid_t *parent, const char *name, const char *site_name, const setattr_t *_setattr,
              chkinfo_t *chkinfo)
{
        int ret, retry = 0;
        nid_t nid;
        setattr_t setattr;

        memcpy(&setattr, _setattr, sizeof(*_setattr));

        ret = __md_check_replica(parent, &setattr, __S_IFREG);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        ret = md_map_getsrv(parent, &nid);

        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_create(&nid, parent, name, site_name, &setattr, chkinfo);

        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        md_parent_update(&chkinfo->id, parent);

        return 0;
err_ret:
        return ret;
}

STATIC int __md_createwith(const nid_t *nid, const fileid_t *parent,
                           const char *name, const chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_mkvolwith(parent, name, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_mkvolwith(nid, parent, name, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_mkvolwith(const fileid_t *parent, const char *name, const chkinfo_t *chkinfo)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_createwith(&nid, parent, name, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        md_parent_update(&chkinfo->id, parent);

        return 0;
err_ret:
        return ret;
}

STATIC int __md_listpool_open(const nid_t *nid, const fileid_t *dirid,
                const char *uuid)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_listpool_open(dirid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_listpool_open(nid, dirid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_listpool(const nid_t *nid, const fileid_t *dirid,
                const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_listpool(dirid, uuid, offset, de, delen);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_listpool(nid, dirid, uuid, offset, de, delen);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_listpool_close(const nid_t *nid, const fileid_t *dirid,
                const char *uuid)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_listpool_close(dirid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_listpool_close(nid, dirid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_isempty(const fileid_t *dirid, int *empty)
{
        int ret, delen, retry = 0;
        struct dirent *de;
        char *de0;//[BIG_BUF_LEN] = {};
        uint64_t offset = 0;
        char uuid[MAX_NAME_LEN] = {};

        YASSERT(dirid->type == __POOL_CHUNK__);

        ret = ymalloc((void **)&de0, BIG_BUF_LEN);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        get_uuid(uuid);
        ret = md_listpool_open(dirid, uuid);
        if (ret)
                GOTO(err_ret, ret);

retry:
        delen = BIG_BUF_LEN;
        ret = md_listpool(dirid, uuid, 0, de0, &delen);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_close, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_close, ret);
        }

        *empty = 1;
        if (delen == 0)
                goto out;

        dir_for_each(de0, delen, de, offset) {
                if (strcmp(de->d_name, ".") == 0
                                || strcmp(de->d_name, "..") == 0)
                        continue;

                if (strlen(de->d_name) == 0)
                        goto out;

                *empty = 0;
                break;
        }

out:
        md_listpool_close(dirid, uuid);
        yfree((void **)&de0);
        return 0;
err_close:
        md_listpool_close(dirid, uuid);
err_ret:
        yfree((void **)&de0);
        return ret;
}

int md_listpool_open(const fileid_t *dirid, const char *uuid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(dirid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_listpool_open(&nid, dirid, uuid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(dirid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_listpool(const fileid_t *dirid, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(dirid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_listpool(&nid, dirid, uuid, offset, de, delen);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(dirid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_listpool_close(const fileid_t *dirid, const char *uuid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(dirid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_listpool_close(&nid, dirid, uuid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(dirid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_lookup(const nid_t *nid, const fileid_t *parent,
                       const char *name, chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_lookup(parent, name, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_lookup(nid, parent, name, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_lookup_byname1(const fileid_t *parent, const char *name,
                     chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_lookup(&nid, parent, name, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        return ret;
}

int md_lookup_byname(const char *pool, const fileid_t *parent, const char *name,
                     chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //DINFO("lookup "CHKID_FORMAT" from %s\n", CHKID_ARG(parent),
        ret = __md_lookup(&nid, parent, name, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        ret = md_chunk_online(pool, parent, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        //md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        md_parent_update(&chkinfo->id, parent);

        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        return ret;
}


int md_getparent(const nid_t *nid, const chkid_t *chkid, chkid_t *parent)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = replica_srv_getparent(chkid, parent, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_getparent(nid, chkid, parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_rmpool(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_rmpool(fileid, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_rmpool(nid, fileid, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_rmpool(const fileid_t *fileid, const char *name)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_rmpool(&nid, fileid, name);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_rmvol(const nid_t *nid, const fileid_t *fileid,
                const char *name, int force)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_rmvol(fileid, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_rmvol(nid, fileid, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 * 当force为1时，强制删除。
 * 强制删除只从pool table中删除记录。
 * issues:10314
 *
 * @Param fileid
 * @Param name
 * @Param force
 *
 * @Returns
 */
int md_rmvol(const fileid_t *fileid, const char *name, int force)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_rmvol(&nid, fileid, name, force);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_rename_lock(const nid_t *nid, const fileid_t *fileid,
                            const fileid_t *src, const char *name, int force)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_rename_lock(fileid, src, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_rename_lock(nid, fileid, src, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


int md_rename_lock(const fileid_t *fileid, const fileid_t *src, const char *name, int force)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_rename_lock(&nid, fileid, src, name, force);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_rename_unlock(const nid_t *nid, const fileid_t *fileid, const chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_rename_unlock(fileid, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_rename_unlock(nid, fileid, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


int md_rename_unlock(const fileid_t *fileid, const chkinfo_t *chkinfo)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_rename_unlock(&nid, fileid, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_rename(const nid_t *nid, const fileid_t *from, const char *fname,
              const fileid_t *to, const char *tname)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_rename(from, fname, to, tname);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_rename(nid, from, fname, to, tname);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


int md_rename(const fileid_t *from, const char *fname,
              const fileid_t *to, const char *tname)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(from, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_rename(&nid, from, fname, to, tname);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(from, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_xattr_get(const nid_t *nid, const chkid_t *chkid, const char *key,
                char *value, int *valuelen)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_xattr_get(chkid, key, value, valuelen);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_xattr_get(nid, chkid, key, value, valuelen);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int md_xattr_get(const nid_t *_nid, const chkid_t *chkid, const char *key,
                char *value, int *valuelen)
{
        int ret, retry = 0;
        const nid_t *nid;
        nid_t tmp;

        if (_nid == NULL) {
        retry:
                ret = md_map_getsrv(chkid, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &tmp;
        } else
                nid = _nid;

        ret = __md_xattr_get(nid, chkid, key, value, valuelen);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(chkid, nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __md_xattr_list(const nid_t *nid, const chkid_t *chkid, char *buf, int *buflen)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_xattr_list(chkid, buf, buflen);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_xattr_list(nid, chkid, buf, buflen);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int md_xattr_list(const nid_t *_nid, const chkid_t *chkid, char *buf, int *buflen)
{
        int ret, retry = 0;
        const nid_t *nid;
        nid_t tmp;

        if (_nid == NULL) {
        retry:
                ret = md_map_getsrv(chkid, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &tmp;
        } else
                nid = _nid;

        ret = __md_xattr_list(nid, chkid, buf, buflen);
        if (unlikely(ret)) {
                if (ret == EREMCHG && retry == 0) {
                        md_map_drop(chkid, &tmp);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_xattr_set(const nid_t *_nid, const chkid_t *chkid, const char *key,
                   const char *value, int valuelen, int flag)
{
        int ret, retry = 0;
        const nid_t *nid;
        nid_t tmp;

        if (_nid == NULL) {
        retry:
                ret = md_map_getsrv(chkid, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &tmp;
        } else
                nid = _nid;

        ret = stor_rpc_xattr_set(nid, chkid, key, value, valuelen, flag);
        if (unlikely(ret)) {
                if (ret == EREMCHG && retry == 0) {
                        md_map_drop(chkid, &tmp);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_xattr_remove(const nid_t *_nid, const chkid_t *chkid, const char *key)
{
        int ret, retry = 0;
        const nid_t *nid;
        nid_t tmp;

        if (_nid == NULL) {
        retry:
                ret = md_map_getsrv(chkid, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &tmp;
        } else
                nid = _nid;

        ret = stor_rpc_xattr_remove(nid, chkid, key);
        if (unlikely(ret)) {
                if (ret == EREMCHG && retry == 0) {
                        md_map_drop(chkid, &tmp);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param _nid
 * @param fileid
 * @param dist nid array
 * @param count
 * @return
 */
int md_move(const nid_t *_nid, const fileid_t *fileid, const nid_t *dist, int count)
{
        int ret;

        YASSERT(_nid);

        if (net_islocal(_nid)) {
                ret = stor_ctl_move(fileid, dist, count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_move(_nid, fileid, dist, count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int md_localize(const nid_t *_nid, const fileid_t *fileid, int idx)
{
        int ret;

        YASSERT(_nid);

        if (net_islocal(_nid)) {
                ret = stor_ctl_localize(fileid, idx);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_localize(_nid, fileid, idx);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_listopen(const nid_t *nid, const fileid_t *volid,
                const char *uuid)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_listopen(volid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_listopen(nid, volid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_snapshot_listopen(const fileid_t *volid, const char *uuid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_listopen(&nid, volid, uuid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_list(const nid_t *nid, const fileid_t *volid,
                const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_list(volid, uuid, offset, de, delen);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_list(nid, volid, uuid, offset, de, delen);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_list(const fileid_t *volid, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_list(&nid, volid, uuid, offset, de, delen);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_listclose(const nid_t *nid, const fileid_t *volid,
                const char *uuid)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_listclose(volid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_listclose(nid, volid, uuid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_snapshot_listclose(const fileid_t *volid, const char *uuid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_listclose(&nid, volid, uuid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        int id;
        sem_t sem;
        const fileid_t *fid;
        int ret;
        int locked;
        uuid_t uuid;

        int created;
        const char *name;
        const char *_site;
} group_seg_t;

int md_snapshot_check(const fileid_t *fid, const char *name)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&nid)) {
                ret = stor_ctl_snapshot_check(fid, name);
        } else {
                ret = stor_rpc_snapshot_check(&nid, fid, name);
        }

        if (ret) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_snapshot_isempty(const fileid_t *fid, int *empty)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&nid)) {
                ret = stor_ctl_snapshot_isempty(fid, empty);
        } else {
                ret = stor_rpc_snapshot_isempty(&nid, fid, empty);
        }

        if (ret) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC void *__group_snapshot_lock_thread(void *arg)
{
        group_seg_t *seg = (group_seg_t *)arg;
        int ret, retry = 0, max = 10;
        nid_t nid;

retry:
        ret = md_map_getsrv(seg->fid, &nid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        if (net_islocal(&nid)) {
                ret = stor_ctl_group_snapshot_lock(seg->fid, &seg->uuid);
        } else {
                ret = stor_rpc_group_snapshot_lock(&nid, seg->fid, &seg->uuid);
        }

        if (ret) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        seg->locked = 1;
        sem_post(&seg->sem);
        return NULL;
err_ret:
        seg->ret = ret;
        sem_post(&seg->sem);
        return NULL;
}

STATIC void *__group_snapshot_unlock_thread(void *arg)
{
        group_seg_t *seg = (group_seg_t *)arg;
        int ret, retry = 0, max = 10;
        nid_t nid;

retry:
        ret = md_map_getsrv(seg->fid, &nid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        if (net_islocal(&nid)) {
                ret = stor_ctl_group_snapshot_unlock(seg->fid, seg->uuid);
        } else {
                ret = stor_rpc_group_snapshot_unlock(&nid, seg->fid, seg->uuid);
        }

        if (ret) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        seg->locked = 0;
        sem_post(&seg->sem);
        return NULL;
err_ret:
        seg->ret = ret;
        sem_post(&seg->sem);
        return NULL;
}

STATIC int __md_group_snapshot_multi_lock(group_seg_t *segs, int count)
{
        int ret, i;
        pthread_t th[MAX_GROUP_VOLUMES_NUM];
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        for (i = 0; i < count; i++) {
                ret = pthread_create(&th[i], &ta, __group_snapshot_lock_thread, &segs[i]);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        for (i = 0; i < count; i++) {
                sem_wait(&segs[i].sem);
                if (segs[i].locked == 0) {
                        ret = segs[i].ret;
                        GOTO(err_ret, ret);
                }
        }

        return 0;

err_ret:
        return ret;
}

STATIC int __md_group_snapshot_multi_unlock(group_seg_t *segs, int count)
{
        int ret, i, retry = 0;
        pthread_t th[MAX_GROUP_VOLUMES_NUM];
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        for (i = 0; i < count; i++) {
                if (!segs[i].locked || uuid_is_null(segs[i].uuid)) {
                        sem_post(&segs[i].sem);
                        continue;
                }
        retry:
                ret = pthread_create(&th[i], &ta, __group_snapshot_unlock_thread, &segs[i]);
                if (unlikely(ret)) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, 1000);
                }
        }

        ret = 0;
        for (i = 0; i < count; i++) {
                sem_wait(&segs[i].sem);
                if (segs[i].ret) {
                        ret = segs[i].ret;
                }
        }
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __group_snapshot_create_nolock(const fileid_t *fid, const char *name, const char *_site, const uuid_t uuid)
{
        int ret, retry = 0, max = 10;
        nid_t nid;

retry:
        ret = md_map_getsrv(fid, &nid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        if (net_islocal(&nid)) {
                ret = stor_ctl_group_snapshot_create_nolock(fid, name, _site, uuid);
        } else {
                ret = stor_rpc_group_snapshot_create_nolock(&nid, fid, name, _site, uuid);
        }
        if (ret) {
                if (ret == EAGAIN || ret == ETIMEDOUT) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, max, 100 * 1000)
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC void * __group_snapshot_create_nolock_thread(void *arg)
{
        group_seg_t *seg = (group_seg_t *)arg;
        int ret;

        if (!seg->locked || uuid_is_null(seg->uuid)) {
                ret = ESTALE;
                GOTO(err_ret, ret);
        }

        ret = __group_snapshot_create_nolock(seg->fid, seg->name, seg->_site, seg->uuid);
        if (ret)
                GOTO(err_ret, ret);

        seg->created = 1;
        sem_post(&seg->sem);
        return NULL;
err_ret:
        seg->ret = ret;
        sem_post(&seg->sem);
        return NULL;
}

STATIC int __md_group_snapshot_multi_create_nolock(group_seg_t *segs, int count)
{
        int ret, i;
        pthread_t th[MAX_GROUP_VOLUMES_NUM];
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        for (i = 0; i < count; i++) {
                ret = pthread_create(&th[i], &ta, __group_snapshot_create_nolock_thread, &segs[i]);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = 0;
        for (i = 0; i < count; i++) {
                sem_wait(&segs[i].sem);
                if (segs[i].ret) {
                        ret = segs[i].ret;
                }
        }
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __md_group_snapshot_remove_all(group_seg_t *segs, int count)
{
        int ret, i, err = 0;

        for (i = 0; i < count; i++) {
                if (segs[i].created) {
                        ret = md_snapshot_remove(segs[i].fid, segs[i].name, 0);
                        if (ret)
                                err = ret;
                }
        }
        if (err) {
                ret = err;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_group_snapshot_create(const fileid_t *fids, const char *name, int count, const char **_site)
{
        int ret, idx;
        group_seg_t segs[MAX_GROUP_VOLUMES_NUM];

        memset(segs, 0x0, sizeof(segs));
        for (idx = 0; idx < count; idx++) {
                segs[idx].fid = fids + idx;
                segs[idx].ret = 0;
                segs[idx].locked = 0;
                sem_init(&segs[idx].sem, 0, 0);
                uuid_clear(segs[idx].uuid);

                segs[idx].id = idx;
                segs[idx].created = 0;
                segs[idx].name = name;
                segs[idx]._site = _site[idx];
        }

        ret = __md_group_snapshot_multi_lock(segs, count);
        if (ret)
                GOTO(err_unlock, ret);

        ret = __md_group_snapshot_multi_create_nolock(segs, count);
        if (ret)
                GOTO(err_remove, ret);

        ret = __md_group_snapshot_multi_unlock(segs, count);
        if (ret)
                GOTO(err_remove, ret);

        return 0;
err_remove:
        __md_group_snapshot_multi_unlock(segs, count);
        __md_group_snapshot_remove_all(segs, count);
        return ret;
err_unlock:
        __md_group_snapshot_multi_unlock(segs, count);
        return ret;
}

STATIC int __md_snapshot_create(const nid_t *nid, const chkid_t *parent, const char *name, int p, const char *_site)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_create(parent, name, p, _site);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_create(nid, parent, name, p, _site);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_create_with_area(const fileid_t *parent, const char *name, int p, const char *_site)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_create(&nid, parent, name, p, _site);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_snapshot_create(const fileid_t *parent, int p, const char *name)
{
        return md_snapshot_create_with_area(parent, name, p, "");
}

STATIC int __md_snapshot_rollback(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_rollback(fileid, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_rollback(nid, fileid, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_rollback(const fileid_t *fileid, const char *name)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_rollback(&nid, fileid, name);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_flat(const nid_t *nid, const fileid_t *fileid, const int idx, int force)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_flat(fileid, idx, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_flat(nid, fileid, idx, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_flat(const fileid_t *fileid, const int idx, int force)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_flat(&nid, fileid, idx, force);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_remove(const nid_t *nid, const fileid_t *fileid,
                                const char *name, int force)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_remove(fileid, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_remove(nid, fileid, name, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_remove(const fileid_t *fileid, const char *name, int force)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_remove(&nid, fileid, name, force);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_protect(const nid_t *nid, const fileid_t *fileid, const snap_protect_param_t on)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_protect(fileid, on);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_protect(nid, fileid, on);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_protect(const fileid_t *fileid, const snap_protect_param_t on)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_protect(&nid, fileid, on);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_connection(const nid_t *nid, const fileid_t *fileid, void *buf, int *count)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_connection(fileid, nid, buf, count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_connection(nid, fileid, buf, count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        }

        return 0;
err_ret:
        return ret;
}

/**
 *  return: 没有挂载，返回0
 **/
int md_check_connection(fileid_t *fileid)
{
        int ret, count = MAX_BUF_LEN;
        char *buf = NULL;
        nid_t nid;

        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&buf, MAX_BUF_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(buf, 0x00, MAX_BUF_LEN);

        if (net_islocalcall(&nid)) {
                ret = stor_ctl_connection(fileid, &nid, buf, &count);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        } else {
                ret = stor_rpc_connection(&nid, fileid, buf, &count);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        if (strlen(buf)) {
                DERROR("There are connections on this volume and you must remove them.\n");
                ret = EPERM;
                GOTO(err_free, ret);
        }

        yfree((void **)&buf);
        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

int __md_disconnect(const nid_t *nid, const fileid_t *fileid, const nid_t *peer, const char *addr)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_disconnect(fileid, peer, addr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_disconnect(nid, fileid, peer, addr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_disconnect(const fileid_t *fileid, const nid_t *peer, const char *addr)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_disconnect(&nid, fileid, peer, addr);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_setattr_extend(const fileid_t *fileid, const char *key, const char *value)
{
        int ret, count, i, j, row, column;
        char *tmp = NULL;
        char *items[1024];
        char buf[256] = {}, keybuf[256] = {};

        ret = ymalloc((void **)&tmp, strlen(value) + 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(tmp, value);
        if (strcmp(key, INITIATOR) == 0) {
                _ch_replace(tmp, '&', ',');  //replace old version
        }

        count = 1024;
        _str_split(tmp, ',', items, &count);
        if (count > 512) {
                printf("\x1b[1;31m Maximum support for saving 512 records, persent %d.\x1b[0m\n", count);
                ret = EPERM;
                GOTO(err_free, ret);
        }
        for (i = 0; i < count; i++) {
                if (strlen(items[i]) + 1 > IQN_MAX_LEN) {
                        printf("\x1b[1;31m the length of item[%s] should be less than 72 characters.\x1b[0m\n", items[i]);
                        ret = EPERM;
                        GOTO(err_free, ret);
                }
        }

        YASSERT(IQN_MAX_LEN < 256);
        column = 256 / IQN_MAX_LEN;
        row = count % column ? (count / column + 1) : (count / column);
        for (i = 0; i < row; i++) {
                for (j = 0; j < column; j++) {
                        if (i * column + j >= count) {
                                break;
                        }
                        if (j == 0)
                                strcpy(buf, items[i * column + j]);
                        else
                                strcat(buf, items[i * column + j]);
                        strcat(buf, ",");
                }

                sprintf(keybuf, "%s%d", key, i);
                ret = md_xattr_set(NULL, fileid, keybuf, buf, strlen(buf), 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        memset(buf, 0x00, sizeof(buf));
        sprintf(buf, "%d", count);
        if (strcmp(key, INITIATOR) == 0) {
                ret = md_xattr_set(NULL, fileid, INITIATOR_COUNT, buf, strlen(buf), 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        } else if (strcmp(key, LICH_SYSTEM_ATTR_AUTH) == 0) {
                ret = md_xattr_set(NULL, fileid, LICH_SYSTEM_ATTR_USERCOUNT, buf, strlen(buf), 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        } else {
        }

        yfree((void **)&tmp);
        return 0;
err_free:
        yfree((void **)&tmp);
err_ret:
        return ret;
}

int md_getattr_extend(const fileid_t *fileid, const char *key, char *value)
{
        int ret, i, buflen = MAX_MSG_SIZE, count = 0, row, column, enokey = 0;
        nid_t nid;
        char buf[MAX_MSG_SIZE], _key[MAX_MSG_SIZE];
        char *_value = value;

        if (strcmp(key, INITIATOR) == 0) {
                strcpy(_key, INITIATOR_COUNT);
        } else if (strcmp(key, LICH_SYSTEM_ATTR_AUTH) == 0) {
                strcpy(_key, LICH_SYSTEM_ATTR_USERCOUNT);
        } else {
        }

        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_xattr_get(&nid, fileid, _key, buf, &buflen);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        /*** old version has no attr(INITIATOR_COUNT) ***/
                        enokey++;
                } else
                        GOTO(err_ret, ret);
        } else {
                YASSERT(IQN_MAX_LEN < 256);
                count = atoi(buf);
                column = 256 / IQN_MAX_LEN;
                row = count % column ? (count / column + 1) : (count / column);
                for (i = 0; i < row; i++) {
                        buflen = MAX_MSG_SIZE;
                        sprintf(_key, "%s%d", key, i);
                        ret = md_xattr_get(&nid, fileid, _key, _value, &buflen);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
                        _value += strlen(_value);
                }
        }

        buflen = MAX_MSG_SIZE;
        if (strcmp(key, INITIATOR) == 0) {
                /*** Getting old version attr(INITIATOR) ***/
                ret = md_xattr_get(&nid, fileid, key, _value, &buflen);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                enokey++;
                        } else
                                GOTO(err_ret, ret);
                }
                if ((ret == ENOKEY) && (enokey == 2))
                        GOTO(err_ret, ret);
        } else if (strcmp(key, LICH_SYSTEM_ATTR_AUTH) == 0) {
                if ((ret == ENOKEY) && (enokey == 1))
                        GOTO(err_ret, ret);
        } else {
        }


        return 0;
err_ret:
        return ret;
}

int md_removeattr_extend(const fileid_t *fileid, const char *key)
{
        int ret, i, count = 512, buflen = MAX_MSG_SIZE, column, row, enokey = 0;
        char buf[MAX_MSG_SIZE], _key1[MAX_MSG_SIZE], _key2[MAX_MSG_SIZE];
        nid_t nid;

        if (strcmp(key, INITIATOR) == 0) {
                strcpy(_key1, INITIATOR_COUNT);
        } else if (strcmp(key, LICH_SYSTEM_ATTR_AUTH) == 0) {
                strcpy(_key1, LICH_SYSTEM_ATTR_USERCOUNT);
        } else {
        }

        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_xattr_get(&nid, fileid, _key1, buf, &buflen);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        enokey++;
                } else {
                        GOTO(err_ret, ret);
                }
        } else {
                YASSERT(IQN_MAX_LEN < 256);
                count = atoi(buf);
                column = 256 / IQN_MAX_LEN;
                row = count % column ? (count / column + 1) : (count / column);
                for (i = 0; i < row; i++) {
                        sprintf(_key2, "%s%d", key, i);
                        ret = md_xattr_remove(&nid, fileid, _key2);
                        if (unlikely(ret)) {
                                if (ret == ENOKEY) {
                                        continue;
                                } else {
                                        GOTO(err_ret, ret);
                                }
                        }
                }

                ret = md_xattr_remove(&nid, fileid, _key1);
                if (ret == ENOKEY) {
                        enokey++;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        if (strcmp(key, INITIATOR) == 0) {
                ret = md_xattr_remove(&nid, fileid, key);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                enokey++;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
                if ((ret == ENOKEY) && (enokey == 2))
                        GOTO(err_ret, ret);
        } else if (strcmp(key, LICH_SYSTEM_ATTR_AUTH) == 0) {
                if ((ret == ENOKEY) && (enokey == 1))
                        GOTO(err_ret, ret);
        } else {
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_updateparent(const nid_t *nid, const fileid_t *fileid, const char *name, const uint64_t from)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_updateparent(fileid, name, from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_updateparent(nid, fileid, name, from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_updateparent(const fileid_t *fileid, const char *name, const uint64_t from)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_updateparent(&nid, fileid, name, from);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_setfrom(const nid_t *nid, const fileid_t *fileid, const uint64_t from)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_setfrom(fileid, from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_setfrom(nid, fileid, from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_setfrom(const fileid_t *fileid, const uint64_t from)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_setfrom(&nid, fileid, from);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_vfm_cleanup(const nid_t *nid, const fileid_t *fileid)
{
        int ret, i;
        fileinfo_t fileinfo;
        uint32_t max, chknum;
        chkid_t tid;

        ret = md_getattr(fileid, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chknum = size2chknum(fileinfo.size, NULL);
        max = chknum2tabnum(chknum);

        for (i = 0; i < max; i++) {
                tid = *fileid;
                tid.idx = i;
                tid.type = __VOLUME_SUB_CHUNK__;

                if (net_islocalcall(nid)) {
                        ret = stor_ctl_vfm_cleanup(fileid, &tid);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        DWARN("check "CHKID_FORMAT"\n", CHKID_ARG(&tid));
                                        continue;
                                } else
                                        GOTO(err_ret, ret);
                        }
                } else {
                        ret = stor_rpc_vfm_cleanup(nid, fileid, &tid);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        DWARN("check "CHKID_FORMAT"\n", CHKID_ARG(&tid));
                                        continue;
                                } else
                                        GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

int md_vfm_cleanup(const fileid_t *fileid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_vfm_cleanup(&nid, fileid);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_last(const nid_t *nid, const fileid_t *fileid,
                nid_t *snapnid, fileid_t *snapid, char *snapname, uint64_t *snap_version)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_last(fileid, snapnid, snapid, snapname, snap_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_last(nid, fileid, snapnid, snapid, snapname, snap_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_last(const fileid_t *fileid, nid_t *snapnid, fileid_t *snapid, char *snapname, uint64_t *snap_version)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_last(&nid, fileid, snapnid, snapid, snapname, snap_version);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_snapshot_prev(const nid_t *nid, const fileid_t *fileid, const fileid_t *snapid,
                fileid_t *previd, char *name, uint64_t *snap_version)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_snapshot_prev(fileid, snapid, previd, name, snap_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_prev(nid, fileid, snapid, previd, name, snap_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_snapshot_prev(const fileid_t *fileid, const fileid_t *snapid, fileid_t *previd, char *name, uint64_t *snap_version)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_snapshot_prev(&nid, fileid, snapid, previd, name, snap_version);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_init()
{
        int ret;

        ret = md_root_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_map_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_parent_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (ng.daemon) {
                ret = stor_ctl_init(32768);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = chunk_proto_init();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = stor_rpc_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
